1 /*
2 * Copyright (C) 2011 The Guava Authors
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 package com.google.common.util.concurrent;
18
19 import com.google.common.annotations.Beta;
20 import com.google.common.annotations.VisibleForTesting;
21 import com.google.common.base.MoreObjects;
22 import com.google.common.base.Preconditions;
23 import com.google.common.base.Supplier;
24 import com.google.common.collect.ImmutableList;
25 import com.google.common.collect.Iterables;
26 import com.google.common.collect.MapMaker;
27 import com.google.common.math.IntMath;
28 import com.google.common.primitives.Ints;
29
30 import java.lang.ref.Reference;
31 import java.lang.ref.ReferenceQueue;
32 import java.lang.ref.WeakReference;
33 import java.math.RoundingMode;
34 import java.util.Arrays;
35 import java.util.Collections;
36 import java.util.List;
37 import java.util.concurrent.ConcurrentMap;
38 import java.util.concurrent.Semaphore;
39 import java.util.concurrent.atomic.AtomicReferenceArray;
40 import java.util.concurrent.locks.Lock;
41 import java.util.concurrent.locks.ReadWriteLock;
42 import java.util.concurrent.locks.ReentrantLock;
43 import java.util.concurrent.locks.ReentrantReadWriteLock;
44
45 /**
46 * A striped {@code Lock/Semaphore/ReadWriteLock}. This offers the underlying lock striping
47 * similar to that of {@code ConcurrentHashMap} in a reusable form, and extends it for
48 * semaphores and read-write locks. Conceptually, lock striping is the technique of dividing a lock
49 * into many <i>stripes</i>, increasing the granularity of a single lock and allowing independent
50 * operations to lock different stripes and proceed concurrently, instead of creating contention
51 * for a single lock.
52 *
53 * <p>The guarantee provided by this class is that equal keys lead to the same lock (or semaphore),
54 * i.e. {@code if (key1.equals(key2))} then {@code striped.get(key1) == striped.get(key2)}
55 * (assuming {@link Object#hashCode()} is correctly implemented for the keys). Note
56 * that if {@code key1} is <strong>not</strong> equal to {@code key2}, it is <strong>not</strong>
57 * guaranteed that {@code striped.get(key1) != striped.get(key2)}; the elements might nevertheless
58 * be mapped to the same lock. The lower the number of stripes, the higher the probability of this
59 * happening.
60 *
61 * <p>There are three flavors of this class: {@code Striped<Lock>}, {@code Striped<Semaphore>},
62 * and {@code Striped<ReadWriteLock>}. For each type, two implementations are offered:
63 * {@linkplain #lock(int) strong} and {@linkplain #lazyWeakLock(int) weak}
64 * {@code Striped<Lock>}, {@linkplain #semaphore(int, int) strong} and {@linkplain
65 * #lazyWeakSemaphore(int, int) weak} {@code Striped<Semaphore>}, and {@linkplain
66 * #readWriteLock(int) strong} and {@linkplain #lazyWeakReadWriteLock(int) weak}
67 * {@code Striped<ReadWriteLock>}. <i>Strong</i> means that all stripes (locks/semaphores) are
68 * initialized eagerly, and are not reclaimed unless {@code Striped} itself is reclaimable.
69 * <i>Weak</i> means that locks/semaphores are created lazily, and they are allowed to be reclaimed
70 * if nobody is holding on to them. This is useful, for example, if one wants to create a {@code
71 * Striped<Lock>} of many locks, but worries that in most cases only a small portion of these
72 * would be in use.
73 *
74 * <p>Prior to this class, one might be tempted to use {@code Map<K, Lock>}, where {@code K}
75 * represents the task. This maximizes concurrency by having each unique key mapped to a unique
76 * lock, but also maximizes memory footprint. On the other extreme, one could use a single lock
77 * for all tasks, which minimizes memory footprint but also minimizes concurrency. Instead of
78 * choosing either of these extremes, {@code Striped} allows the user to trade between required
79 * concurrency and memory footprint. For example, if a set of tasks are CPU-bound, one could easily
80 * create a very compact {@code Striped<Lock>} of {@code availableProcessors() * 4} stripes,
81 * instead of possibly thousands of locks which could be created in a {@code Map<K, Lock>}
82 * structure.
83 *
84 * @author Dimitris Andreou
85 * @since 13.0
86 */
87 @Beta
88 public abstract class Striped<L> {
89 /**
90 * If there are at least this many stripes, we assume the memory usage of a ConcurrentMap will be
91 * smaller than a large array. (This assumes that in the lazy case, most stripes are unused. As
92 * always, if many stripes are in use, a non-lazy striped makes more sense.)
93 */
94 private static final int LARGE_LAZY_CUTOFF = 1024;
95
96 private Striped() {}
97
98 /**
99 * Returns the stripe that corresponds to the passed key. It is always guaranteed that if
100 * {@code key1.equals(key2)}, then {@code get(key1) == get(key2)}.
101 *
102 * @param key an arbitrary, non-null key
103 * @return the stripe that the passed key corresponds to
104 */
105 public abstract L get(Object key);
106
107 /**
108 * Returns the stripe at the specified index. Valid indexes are 0, inclusively, to
109 * {@code size()}, exclusively.
110 *
111 * @param index the index of the stripe to return; must be in {@code [0...size())}
112 * @return the stripe at the specified index
113 */
114 public abstract L getAt(int index);
115
116 /**
117 * Returns the index to which the given key is mapped, so that getAt(indexFor(key)) == get(key).
118 */
119 abstract int indexFor(Object key);
120
121 /**
122 * Returns the total number of stripes in this instance.
123 */
124 public abstract int size();
125
126 /**
127 * Returns the stripes that correspond to the passed objects, in ascending (as per
128 * {@link #getAt(int)}) order. Thus, threads that use the stripes in the order returned
129 * by this method are guaranteed to not deadlock each other.
130 *
131 * <p>It should be noted that using a {@code Striped<L>} with relatively few stripes, and
132 * {@code bulkGet(keys)} with a relative large number of keys can cause an excessive number
133 * of shared stripes (much like the birthday paradox, where much fewer than anticipated birthdays
134 * are needed for a pair of them to match). Please consider carefully the implications of the
135 * number of stripes, the intended concurrency level, and the typical number of keys used in a
136 * {@code bulkGet(keys)} operation. See <a href="http://www.mathpages.com/home/kmath199.htm">Balls
137 * in Bins model</a> for mathematical formulas that can be used to estimate the probability of
138 * collisions.
139 *
140 * @param keys arbitrary non-null keys
141 * @return the stripes corresponding to the objects (one per each object, derived by delegating
142 * to {@link #get(Object)}; may contain duplicates), in an increasing index order.
143 */
144 public Iterable<L> bulkGet(Iterable<?> keys) {
145 // Initially using the array to store the keys, then reusing it to store the respective L's
146 final Object[] array = Iterables.toArray(keys, Object.class);
147 if (array.length == 0) {
148 return ImmutableList.of();
149 }
150 int[] stripes = new int[array.length];
151 for (int i = 0; i < array.length; i++) {
152 stripes[i] = indexFor(array[i]);
153 }
154 Arrays.sort(stripes);
155 // optimize for runs of identical stripes
156 int previousStripe = stripes[0];
157 array[0] = getAt(previousStripe);
158 for (int i = 1; i < array.length; i++) {
159 int currentStripe = stripes[i];
160 if (currentStripe == previousStripe) {
161 array[i] = array[i - 1];
162 } else {
163 array[i] = getAt(currentStripe);
164 previousStripe = currentStripe;
165 }
166 }
167 /*
168 * Note that the returned Iterable holds references to the returned stripes, to avoid
169 * error-prone code like:
170 *
171 * Striped<Lock> stripedLock = Striped.lazyWeakXXX(...)'
172 * Iterable<Lock> locks = stripedLock.bulkGet(keys);
173 * for (Lock lock : locks) {
174 * lock.lock();
175 * }
176 * operation();
177 * for (Lock lock : locks) {
178 * lock.unlock();
179 * }
180 *
181 * If we only held the int[] stripes, translating it on the fly to L's, the original locks
182 * might be garbage collected after locking them, ending up in a huge mess.
183 */
184 @SuppressWarnings("unchecked") // we carefully replaced all keys with their respective L's
185 List<L> asList = (List<L>) Arrays.asList(array);
186 return Collections.unmodifiableList(asList);
187 }
188
189 // Static factories
190
191 /**
192 * Creates a {@code Striped<Lock>} with eagerly initialized, strongly referenced locks.
193 * Every lock is reentrant.
194 *
195 * @param stripes the minimum number of stripes (locks) required
196 * @return a new {@code Striped<Lock>}
197 */
198 public static Striped<Lock> lock(int stripes) {
199 return new CompactStriped<Lock>(stripes, new Supplier<Lock>() {
200 @Override public Lock get() {
201 return new PaddedLock();
202 }
203 });
204 }
205
206 /**
207 * Creates a {@code Striped<Lock>} with lazily initialized, weakly referenced locks.
208 * Every lock is reentrant.
209 *
210 * @param stripes the minimum number of stripes (locks) required
211 * @return a new {@code Striped<Lock>}
212 */
213 public static Striped<Lock> lazyWeakLock(int stripes) {
214 return lazy(stripes, new Supplier<Lock>() {
215 @Override public Lock get() {
216 return new ReentrantLock(false);
217 }
218 });
219 }
220
221 private static <L> Striped<L> lazy(int stripes, Supplier<L> supplier) {
222 return stripes < LARGE_LAZY_CUTOFF
223 ? new SmallLazyStriped<L>(stripes, supplier)
224 : new LargeLazyStriped<L>(stripes, supplier);
225 }
226
227 /**
228 * Creates a {@code Striped<Semaphore>} with eagerly initialized, strongly referenced semaphores,
229 * with the specified number of permits.
230 *
231 * @param stripes the minimum number of stripes (semaphores) required
232 * @param permits the number of permits in each semaphore
233 * @return a new {@code Striped<Semaphore>}
234 */
235 public static Striped<Semaphore> semaphore(int stripes, final int permits) {
236 return new CompactStriped<Semaphore>(stripes, new Supplier<Semaphore>() {
237 @Override public Semaphore get() {
238 return new PaddedSemaphore(permits);
239 }
240 });
241 }
242
243 /**
244 * Creates a {@code Striped<Semaphore>} with lazily initialized, weakly referenced semaphores,
245 * with the specified number of permits.
246 *
247 * @param stripes the minimum number of stripes (semaphores) required
248 * @param permits the number of permits in each semaphore
249 * @return a new {@code Striped<Semaphore>}
250 */
251 public static Striped<Semaphore> lazyWeakSemaphore(int stripes, final int permits) {
252 return lazy(stripes, new Supplier<Semaphore>() {
253 @Override public Semaphore get() {
254 return new Semaphore(permits, false);
255 }
256 });
257 }
258
259 /**
260 * Creates a {@code Striped<ReadWriteLock>} with eagerly initialized, strongly referenced
261 * read-write locks. Every lock is reentrant.
262 *
263 * @param stripes the minimum number of stripes (locks) required
264 * @return a new {@code Striped<ReadWriteLock>}
265 */
266 public static Striped<ReadWriteLock> readWriteLock(int stripes) {
267 return new CompactStriped<ReadWriteLock>(stripes, READ_WRITE_LOCK_SUPPLIER);
268 }
269
270 /**
271 * Creates a {@code Striped<ReadWriteLock>} with lazily initialized, weakly referenced
272 * read-write locks. Every lock is reentrant.
273 *
274 * @param stripes the minimum number of stripes (locks) required
275 * @return a new {@code Striped<ReadWriteLock>}
276 */
277 public static Striped<ReadWriteLock> lazyWeakReadWriteLock(int stripes) {
278 return lazy(stripes, READ_WRITE_LOCK_SUPPLIER);
279 }
280
281 // ReentrantReadWriteLock is large enough to make padding probably unnecessary
282 private static final Supplier<ReadWriteLock> READ_WRITE_LOCK_SUPPLIER =
283 new Supplier<ReadWriteLock>() {
284 @Override public ReadWriteLock get() {
285 return new ReentrantReadWriteLock();
286 }
287 };
288
289 private abstract static class PowerOfTwoStriped<L> extends Striped<L> {
290 /** Capacity (power of two) minus one, for fast mod evaluation */
291 final int mask;
292
293 PowerOfTwoStriped(int stripes) {
294 Preconditions.checkArgument(stripes > 0, "Stripes must be positive");
295 this.mask = stripes > Ints.MAX_POWER_OF_TWO ? ALL_SET : ceilToPowerOfTwo(stripes) - 1;
296 }
297
298 @Override final int indexFor(Object key) {
299 int hash = smear(key.hashCode());
300 return hash & mask;
301 }
302
303 @Override public final L get(Object key) {
304 return getAt(indexFor(key));
305 }
306 }
307
308 /**
309 * Implementation of Striped where 2^k stripes are represented as an array of the same length,
310 * eagerly initialized.
311 */
312 private static class CompactStriped<L> extends PowerOfTwoStriped<L> {
313 /** Size is a power of two. */
314 private final Object[] array;
315
316 private CompactStriped(int stripes, Supplier<L> supplier) {
317 super(stripes);
318 Preconditions.checkArgument(stripes <= Ints.MAX_POWER_OF_TWO, "Stripes must be <= 2^30)");
319
320 this.array = new Object[mask + 1];
321 for (int i = 0; i < array.length; i++) {
322 array[i] = supplier.get();
323 }
324 }
325
326 @SuppressWarnings("unchecked") // we only put L's in the array
327 @Override public L getAt(int index) {
328 return (L) array[index];
329 }
330
331 @Override public int size() {
332 return array.length;
333 }
334 }
335
336 /**
337 * Implementation of Striped where up to 2^k stripes can be represented, using an
338 * AtomicReferenceArray of size 2^k. To map a user key into a stripe, we take a k-bit slice of the
339 * user key's (smeared) hashCode(). The stripes are lazily initialized and are weakly referenced.
340 */
341 @VisibleForTesting static class SmallLazyStriped<L> extends PowerOfTwoStriped<L> {
342 final AtomicReferenceArray<ArrayReference<? extends L>> locks;
343 final Supplier<L> supplier;
344 final int size;
345 final ReferenceQueue<L> queue = new ReferenceQueue<L>();
346
347 SmallLazyStriped(int stripes, Supplier<L> supplier) {
348 super(stripes);
349 this.size = (mask == ALL_SET) ? Integer.MAX_VALUE : mask + 1;
350 this.locks = new AtomicReferenceArray<ArrayReference<? extends L>>(size);
351 this.supplier = supplier;
352 }
353
354 @Override public L getAt(int index) {
355 if (size != Integer.MAX_VALUE) {
356 Preconditions.checkElementIndex(index, size());
357 } // else no check necessary, all index values are valid
358 ArrayReference<? extends L> existingRef = locks.get(index);
359 L existing = existingRef == null ? null : existingRef.get();
360 if (existing != null) {
361 return existing;
362 }
363 L created = supplier.get();
364 ArrayReference<L> newRef = new ArrayReference<L>(created, index, queue);
365 while (!locks.compareAndSet(index, existingRef, newRef)) {
366 // we raced, we need to re-read and try again
367 existingRef = locks.get(index);
368 existing = existingRef == null ? null : existingRef.get();
369 if (existing != null) {
370 return existing;
371 }
372 }
373 drainQueue();
374 return created;
375 }
376
377 // N.B. Draining the queue is only necessary to ensure that we don't accumulate empty references
378 // in the array. We could skip this if we decide we don't care about holding on to Reference
379 // objects indefinitely.
380 private void drainQueue() {
381 Reference<? extends L> ref;
382 while ((ref = queue.poll()) != null) {
383 // We only ever register ArrayReferences with the queue so this is always safe.
384 ArrayReference<? extends L> arrayRef = (ArrayReference<? extends L>) ref;
385 // Try to clear out the array slot, n.b. if we fail that is fine, in either case the
386 // arrayRef will be out of the array after this step.
387 locks.compareAndSet(arrayRef.index, arrayRef, null);
388 }
389 }
390
391 @Override public int size() {
392 return size;
393 }
394
395 private static final class ArrayReference<L> extends WeakReference<L> {
396 final int index;
397
398 ArrayReference(L referent, int index, ReferenceQueue<L> queue) {
399 super(referent, queue);
400 this.index = index;
401 }
402 }
403 }
404
405 /**
406 * Implementation of Striped where up to 2^k stripes can be represented, using a ConcurrentMap
407 * where the key domain is [0..2^k). To map a user key into a stripe, we take a k-bit slice of the
408 * user key's (smeared) hashCode(). The stripes are lazily initialized and are weakly referenced.
409 */
410 @VisibleForTesting static class LargeLazyStriped<L> extends PowerOfTwoStriped<L> {
411 final ConcurrentMap<Integer, L> locks;
412 final Supplier<L> supplier;
413 final int size;
414
415 LargeLazyStriped(int stripes, Supplier<L> supplier) {
416 super(stripes);
417 this.size = (mask == ALL_SET) ? Integer.MAX_VALUE : mask + 1;
418 this.supplier = supplier;
419 this.locks = new MapMaker().weakValues().makeMap();
420 }
421
422 @Override public L getAt(int index) {
423 if (size != Integer.MAX_VALUE) {
424 Preconditions.checkElementIndex(index, size());
425 } // else no check necessary, all index values are valid
426 L existing = locks.get(index);
427 if (existing != null) {
428 return existing;
429 }
430 L created = supplier.get();
431 existing = locks.putIfAbsent(index, created);
432 return MoreObjects.firstNonNull(existing, created);
433 }
434
435 @Override public int size() {
436 return size;
437 }
438 }
439
440 /**
441 * A bit mask were all bits are set.
442 */
443 private static final int ALL_SET = ~0;
444
445 private static int ceilToPowerOfTwo(int x) {
446 return 1 << IntMath.log2(x, RoundingMode.CEILING);
447 }
448
449 /*
450 * This method was written by Doug Lea with assistance from members of JCP
451 * JSR-166 Expert Group and released to the public domain, as explained at
452 * http://creativecommons.org/licenses/publicdomain
453 *
454 * As of 2010/06/11, this method is identical to the (package private) hash
455 * method in OpenJDK 7's java.util.HashMap class.
456 */
457 // Copied from java/com/google/common/collect/Hashing.java
458 private static int smear(int hashCode) {
459 hashCode ^= (hashCode >>> 20) ^ (hashCode >>> 12);
460 return hashCode ^ (hashCode >>> 7) ^ (hashCode >>> 4);
461 }
462
463 private static class PaddedLock extends ReentrantLock {
464 /*
465 * Padding from 40 into 64 bytes, same size as cache line. Might be beneficial to add
466 * a fourth long here, to minimize chance of interference between consecutive locks,
467 * but I couldn't observe any benefit from that.
468 */
469 @SuppressWarnings("unused")
470 long q1, q2, q3;
471
472 PaddedLock() {
473 super(false);
474 }
475 }
476
477 private static class PaddedSemaphore extends Semaphore {
478 // See PaddedReentrantLock comment
479 @SuppressWarnings("unused")
480 long q1, q2, q3;
481
482 PaddedSemaphore(int permits) {
483 super(permits, false);
484 }
485 }
486 }